package elastic.learn.transport.index.multidocument.curd;

import elastic.learn.transport.ElasticsearchConfig;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.concurrent.TimeUnit;

public class BulkProcessorUse {
    public static void main(String[] args) throws InterruptedException {
        TransportClient client = ElasticsearchConfig.getElasticsearchClient();
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
                System.out.println(bulkRequest.numberOfActions());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                System.out.println("success " + bulkResponse.hasFailures());
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                System.out.println("failed " + throwable.getMessage());
            }
        })
                .setBulkActions(10000)
                .setBulkSize(new ByteSizeValue(4, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
        String json = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(json, XContentType.JSON));
        bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
//        bulkProcessor.flush();
        bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
    }
}
